Example: Calculating Pi

This example explores how using the Ray API enables horizontal scalability.

import ray
import logging
import time

# Start Ray. If you're connecting to an existing cluster, you would use ray.init(address=<cluster-address>) instead.
ray.init(
    num_cpus=4,
    num_gpus=1,
    ignore_reinit_error=True,
    logging_level=logging.ERROR,
)
ray.cluster_resources()                    # get the cluster resources
{'GPU': 1.0,
 'object_store_memory': 506860339.0,
 'accelerator_type:GT': 1.0,
 'node:192.168.50.185': 1.0,
 'CPU': 4.0,
 'memory': 1013720679.0}

Note

There is a lot of Python code used in this notebook, both for calculating Pi and for the graphs. We won’t show most of it, but all the code can be found in this directory, pi_calc.py (calculating \(\pi\)) and task_lesson_util.py (graphics).

from task_lesson_util import make_dmaps, run_simulations, stop_simulations
from pi_calc import str_large_n
import numpy as np
import math, statistics, time
from bokeh_util import two_lines_plot, means_stddevs_plot  # Some plotting utilities in `./bokeh_util.py`.
from bokeh.plotting import show, figure
from bokeh.layouts import gridplot
Loading BokehJS ...

Let’s estimate \(\pi\) (~3.14159) using a Monte Carlo technique, where we randomly sampled a uniform distribution, one with equal probably of picking any point in a square.

It works like this. Imagine each blue square is a piece of paper 2 meters by 2 meters you put on a wall. The circle inside each one has radius 1 meter.

Now suppose you throw \(N\) darts at each paper. We’re seeing \(N = {\sim}1000, {\sim}10000, {\sim}100000\) examples. (This will be hard on your wall, so don’t try this at home…)

Some darts will land inside the circle, call them \(n\), and the rest will land outside, \(N-n\). The area of a circle is \({\pi}r^{2}\) and the area of a square is \((2r)^{2} = 4r^{2}\). The ratio of \(n/N\) approximately equals the ratio of the circle area over the square area, \({\pi}r^{2}/4r^{2} = {\pi}/4\). (Does it make sense that this ratio is independent of the actual radius value?).

In other words,

\(\pi/4 \approx n/N\)

\(\pi \approx 4n/N\)

So, to approximate \(\pi\), we can count the number of darts thrown and the number that land inside the circle.

Ns = [1000, 10000, 100000]

dmaps = make_dmaps(Ns)

dmaps[0] + dmaps[1] + dmaps[2]
run_simulations(dmaps)
# TIP: If you want to stop them, uncomment and run the next line:
# stop_simulations(dmaps)

You probably noticed three things while the simulations were running or after they finished:

The accuracy improved for larger \(N\)… well usually. Sometimes a lower \(N\) simulation gets “lucky” and does as well as a higher \(N\). In a real experiment, we would do many runs and then compute the average and standard deviation. (We’ll do that below.)

Because each \(N\) is 10 times the \(N\) to the left, it took roughly 10 times as long for the second to finish compared to the first, etc.

The updates in the second and third simulations appeared to go faster as the neighbors to the left finished.

What this means is that if we really want a good estimate of \(\pi\), we have to do runs with large \(N\), but then we wait longer. Ideally, to get fast and accurate results, we would do as much work as possible in parallel, leveraging all the CPU cores available on our machine … or our cluster.

Let’s use Ray to achieve this.

Parallelism with Ray

We did the previous calculation without fully exploiting all available cores. In a cluster, the rest of the cores on the rest of the machines would be idle, too.

We can use Ray to parallelize a lof this work.

num_workers = 8
trials = 20

Let’s define a function to do the Pi calculation that simplifies the code we used above for graphing purposes. We won’t do the “dart graphs” from now on, because they add a lot of overhead that would obscure the performance

This function estimates \(\pi\) for the number of samples requested. It uses NumPy. If you’re not familiar with it, the implementation details aren’t essential to understand, but the comments try to explain them.

def estimate_pi(num_samples):
    xs = np.random.uniform(low=-1.0, high=1.0, size=num_samples)   # Generate num_samples random samples for the x coordinate.
    ys = np.random.uniform(low=-1.0, high=1.0, size=num_samples)   # Generate num_samples random samples for the y coordinate.
    xys = np.stack((xs, ys), axis=-1)                              # Like Python's "zip(a,b)"; creates np.array([(x1,y1), (x2,y2), ...]).
    inside = xs*xs + ys*ys <= 1.0                                  # Creates a predicate over all the array elements.
    xys_inside = xys[inside]                                       # Selects only those "zipped" array elements inside the circle.
    in_circle = xys_inside.shape[0]                                # Return the number of elements inside the circle.
    approx_pi = 4.0*in_circle/num_samples                          # The Pi estimate.
    return approx_pi

Let’s try it:

Ns = [10000, 50000, 100000, 500000, 1000000] #, 5000000, 10000000]  # Larger values take a long time on small VMs and machines!
maxN = Ns[-1]
maxN
1000000
fmt = '{:10.5f} seconds: pi ~ {:7.6f}, stddev = {:5.4f}, error = {:5.4f}%'
def try_it(n, trials):
    print('trials = {:3d}, N = {:s}: '.format(trials, str_large_n(n, padding=12)), end='')   # str_large_n imported above.
    start = time.time()
    pis = [estimate_pi(n) for _ in range(trials)]
    approx_pi = statistics.mean(pis)
    stdev = statistics.stdev(pis)
    duration = time.time() - start
    error = (100.0*abs(approx_pi-np.pi)/np.pi)
    print(fmt.format(duration, approx_pi, stdev, error))   # str_large_n imported above.
    return trials, n, duration, approx_pi, stdev, error

The next cell will take a few seconds to run.

Note

If all the following trials finish in under a few seconds for the largest \(n\) value in \(Ns\) and the largest number of trials, consider changing \(Ns\) above to add larger values.

data_ns = [try_it(n, trials) for n in Ns]
trials =  20, N =        10000:    0.42149 seconds: pi ~ 3.141980, stddev = 0.0146, error = 0.0123%
trials =  20, N =        50000:    1.15373 seconds: pi ~ 3.141680, stddev = 0.0069, error = 0.0028%
trials =  20, N =       100000:    1.30460 seconds: pi ~ 3.141172, stddev = 0.0041, error = 0.0134%
trials =  20, N =       500000:    3.79476 seconds: pi ~ 3.141681, stddev = 0.0017, error = 0.0028%
trials =  20, N =      1000000: 
WARNING:param.dynamic_operation: Callable raised "AssertionError('New option id 12095962884579532803 does not match any option trees in Store.custom_options.')".
Invoked as dynamic_operation(counter=0)
WARNING:param.dynamic_operation: Callable raised "AssertionError('New option id 12095962884579532803 does not match any option trees in Store.custom_options.')".
Invoked as dynamic_operation(counter=0)
Traceback (most recent call last):
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/plotting/util.py", line 277, in get_plot_frame
    return map_obj[key]
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1341, in __getitem__
    val = self._execute_callback(*tuple_key)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1110, in _execute_callback
    retval = self.callback(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 714, in __call__
    ret = self.callable(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1043, in dynamic_operation
    key, obj = resolve(key, kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1032, in resolve
    return key, map_obj[key]
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1341, in __getitem__
    val = self._execute_callback(*tuple_key)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1110, in _execute_callback
    retval = self.callback(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 714, in __call__
    ret = self.callable(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1044, in dynamic_operation
    return apply(obj, *key, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1036, in apply
    processed = self._process(element, key, kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1020, in _process
    return self.p.operation(element, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 625, in <lambda>
    dmap = Dynamic(obj, operation=lambda obj, **dynkwargs: obj.opts(*args, **kwargs),
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 39, in pipelined_call
    return __call__(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 573, in __call__
    return self._dispatch_opts( *args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 577, in _dispatch_opts
    return self._base_opts(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 656, in _base_opts
    return self._obj.options(*new_args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/dimension.py", line 1306, in options
    obj = obj.opts._dispatch_opts(expanded, backend=backend, clone=clone)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 577, in _dispatch_opts
    return self._base_opts(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 653, in _base_opts
    return opts.apply_groups(self._obj, **dict(kwargs, **new_kwargs))
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 244, in apply_groups
    obj = cls._apply_groups_to_backend(obj, backend_opts, backend, clone)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 169, in _apply_groups_to_backend
    return StoreOptions.set_options(obj_handle, options, backend=backend)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/options.py", line 1847, in set_options
    applied = cls.propagate_ids(obj, match_id, new_id, compositor_applied+list(spec.keys()), backend=backend)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/options.py", line 1509, in propagate_ids
    raise AssertionError("New option id %d does not match any "
AssertionError: New option id 12095962884579532803 does not match any option trees in Store.custom_options.

   5.23905 seconds: pi ~ 3.141898, stddev = 0.0012, error = 0.0097%
data_trials = [try_it(maxN, trials) for trials in range(5,20,2)]
trials =   5, N =      1000000:    1.25038 seconds: pi ~ 3.141445, stddev = 0.0012, error = 0.0047%
trials =   7, N =      1000000: 
WARNING:param.dynamic_operation: Callable raised "AssertionError('New option id 9512652283245651138977098 does not match any option trees in Store.custom_options.')".
Invoked as dynamic_operation(counter=0)
WARNING:param.dynamic_operation: Callable raised "AssertionError('New option id 9512652283245651138977098 does not match any option trees in Store.custom_options.')".
Invoked as dynamic_operation(counter=0)
Traceback (most recent call last):
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/plotting/util.py", line 277, in get_plot_frame
    return map_obj[key]
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1341, in __getitem__
    val = self._execute_callback(*tuple_key)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1110, in _execute_callback
    retval = self.callback(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 714, in __call__
    ret = self.callable(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1043, in dynamic_operation
    key, obj = resolve(key, kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1032, in resolve
    return key, map_obj[key]
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1341, in __getitem__
    val = self._execute_callback(*tuple_key)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 1110, in _execute_callback
    retval = self.callback(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/spaces.py", line 714, in __call__
    ret = self.callable(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1044, in dynamic_operation
    return apply(obj, *key, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1036, in apply
    processed = self._process(element, key, kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 1020, in _process
    return self.p.operation(element, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 625, in <lambda>
    dmap = Dynamic(obj, operation=lambda obj, **dynkwargs: obj.opts(*args, **kwargs),
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 39, in pipelined_call
    return __call__(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 573, in __call__
    return self._dispatch_opts( *args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 577, in _dispatch_opts
    return self._base_opts(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 656, in _base_opts
    return self._obj.options(*new_args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/dimension.py", line 1306, in options
    obj = obj.opts._dispatch_opts(expanded, backend=backend, clone=clone)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 577, in _dispatch_opts
    return self._base_opts(*args, **kwargs)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/accessors.py", line 653, in _base_opts
    return opts.apply_groups(self._obj, **dict(kwargs, **new_kwargs))
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 244, in apply_groups
    obj = cls._apply_groups_to_backend(obj, backend_opts, backend, clone)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/util/__init__.py", line 169, in _apply_groups_to_backend
    return StoreOptions.set_options(obj_handle, options, backend=backend)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/options.py", line 1847, in set_options
    applied = cls.propagate_ids(obj, match_id, new_id, compositor_applied+list(spec.keys()), backend=backend)
  File "/home/donald/.local/share/virtualenvs/ray-d0a2q5pH/lib/python3.9/site-packages/holoviews/core/options.py", line 1509, in propagate_ids
    raise AssertionError("New option id %d does not match any "
AssertionError: New option id 9512652283245651138977098 does not match any option trees in Store.custom_options.

   1.80617 seconds: pi ~ 3.142010, stddev = 0.0018, error = 0.0133%
trials =   9, N =      1000000:    2.41817 seconds: pi ~ 3.141034, stddev = 0.0016, error = 0.0178%
trials =  11, N =      1000000:    2.86020 seconds: pi ~ 3.142263, stddev = 0.0010, error = 0.0213%
trials =  13, N =      1000000:    3.44123 seconds: pi ~ 3.141651, stddev = 0.0020, error = 0.0018%
trials =  15, N =      1000000:    4.38225 seconds: pi ~ 3.141940, stddev = 0.0015, error = 0.0111%
trials =  17, N =      1000000:    4.12695 seconds: pi ~ 3.141848, stddev = 0.0018, error = 0.0081%
trials =  19, N =      1000000:    4.69548 seconds: pi ~ 3.141592, stddev = 0.0014, error = 0.0000%

(We’ll graph the results below.)

The CPU utilization never gets close to 100%. On a four-core machine, for example, the number will be about 25%. (The Ray process meters will stay at or near zero until later in this notebook.)

So, this runs on one core, while the other corse are idle. Now we’ll try with Ray.

From Python Functions to Ray Tasks

You create a Ray task by decorating around a normal Python function with @ray.remote. Thee tasks will be scheduled across your Ray cluster (or your laptop CPU cores).

Here is a Ray task for estimate_pi. All we need is a wrapper around the original function.

@ray.remote
def ray_estimate_pi(num_samples):
    return estimate_pi(num_samples)

Let’s try it. To invoke a task, you use function.remote(args). A Ray task is an asynchronous operation that returns a future called an ObjectRef that is used to access the value when the function completes. We use ray.get(ref) to get it.

ref = ray_estimate_pi.remote(100)
print(ray.get(ref))
3.16

We can also work with a list of refs:

refs = [ray_estimate_pi.remote(n) for n in [100, 1000, 10000]]
print(ray.get(refs))
[3.0, 3.156, 3.1456]

Let’s try our test run again with our Ray task. We’l need a new “try it” function, because of the different task invocation logic. This function doesn’t need to be a Ray task, however, so no @ray.remote decorator is required.

def ray_try_it(n, trials):
    print('trials = {:3d}, N = {:s}: '.format(trials, str_large_n(n, padding=12)), end='')   # str_large_n imported above.
    start = time.time()
    refs = [ray_estimate_pi.remote(n) for _ in range(trials)]
    pis = ray.get(refs)
    approx_pi = statistics.mean(pis)
    stdev = statistics.stdev(pis)
    duration = time.time() - start
    error = (100.0*abs(approx_pi-np.pi)/np.pi)
    print(fmt.format(duration, approx_pi, stdev, error))   # str_large_n imported above.
    return trials, n, duration, approx_pi, stdev, error
ray_data_ns = [ray_try_it(n, trials) for n in Ns]
trials =  20, N =        10000:    0.04690 seconds: pi ~ 3.144740, stddev = 0.0212, error = 0.1002%
trials =  20, N =        50000:    0.09693 seconds: pi ~ 3.144104, stddev = 0.0080, error = 0.0799%
trials =  20, N =       100000:    0.28484 seconds: pi ~ 3.143882, stddev = 0.0048, error = 0.0729%
trials =  20, N =       500000:    0.89003 seconds: pi ~ 3.142444, stddev = 0.0015, error = 0.0271%
trials =  20, N =      1000000:    2.03504 seconds: pi ~ 3.141529, stddev = 0.0015, error = 0.0020%
ray_data_trials = [ray_try_it(maxN, trials) for trials in range(5,20,2)]
trials =   5, N =      1000000:    0.54345 seconds: pi ~ 3.141614, stddev = 0.0009, error = 0.0007%
trials =   7, N =      1000000:    0.61583 seconds: pi ~ 3.140765, stddev = 0.0018, error = 0.0263%
trials =   9, N =      1000000:    0.88133 seconds: pi ~ 3.141650, stddev = 0.0016, error = 0.0018%
trials =  11, N =      1000000:    0.87660 seconds: pi ~ 3.142853, stddev = 0.0016, error = 0.0401%
trials =  13, N =      1000000:    1.07940 seconds: pi ~ 3.142746, stddev = 0.0013, error = 0.0367%
trials =  15, N =      1000000:    1.58624 seconds: pi ~ 3.142915, stddev = 0.0015, error = 0.0421%
trials =  17, N =      1000000:    1.98546 seconds: pi ~ 3.141287, stddev = 0.0020, error = 0.0097%
trials =  19, N =      1000000:    1.62800 seconds: pi ~ 3.141184, stddev = 0.0012, error = 0.0130%

The durations should be shorter than the non-Ray numbers. Let’s graph our results and see. It will be easier if we first convert the `data lists to NumPy arrays so they are easier to slice.

np_data_ns         = np.array(data_ns)
np_data_trials     = np.array(data_trials)
np_ray_data_ns     = np.array(ray_data_ns)
np_ray_data_trials = np.array(ray_data_trials)

First, a linear plot of the results:

two_lines = two_lines_plot(
    "N vs. Execution Times (Smaller Is Better)", 'N', 'Time', 'No Ray', 'Ray', 
    np_data_ns[:,1], np_data_ns[:,2], np_ray_data_ns[:,1], np_ray_data_ns[:,2],
    x_axis_type='linear', y_axis_type='linear')
show(two_lines, plot_width=800, plot_height=400)

For relatively small \(N\) values, the performance overhead of Ray is a larger percentage of the calculation, so the overall performance benefit is less. However, as \(N\) increases, the advantage of Ray increases. Both plots are roughly linear, because we are CPU bound, but Ray’s execution time/\(N\) is lower. On a full clusster, the times could be dramatically better for larger \(N\).

A log-log plot shows the lower-\(N\) behavior more clearly:

two_lines = two_lines_plot(
    "N vs. Execution Times (Smaller Is Better)", 'N', 'Time', 'No Ray', 'Ray', 
    np_data_ns[:,1], np_data_ns[:,2], np_ray_data_ns[:,1], np_ray_data_ns[:,2])
show(two_lines, plot_width=800, plot_height=400)

What about execution times as a function of the number of trials for a fixed \(N\)?

two_lines = two_lines_plot(
    "Trials (N=10,000,000) vs. Execution Times (Smaller Is Better)", 'Trials', 'Time', 'No Ray', 'Ray', 
    np_data_trials[:,0], np_data_trials[:,2], np_ray_data_trials[:,0], np_ray_data_trials[:,2], 
    x_axis_type='linear', y_axis_type='linear')
show(two_lines, plot_width=800, plot_height=400)

Let’s plot the approximate mean values and the standard deviations over the num_workers trials for each \(N\):

pi_without_ray_plot = means_stddevs_plot(
  np_data_ns[:,1], np_data_ns[:,3], np_data_ns[:,4], title = 'π Results without Ray')
# Use a grid to make it layout better.
pi_without_ray_grid = gridplot([[pi_without_ray_plot]], width=1000, height=400)
show(pi_without_ray_grid)

As you might expect, for low \(N\) values, the error bars are large and the mean estimate is poor, but for higher \(N\), the errors grow smaller and results converge to the correct value.

With Ray, the plot will look similar, because w did the same calculation, just faster:

pi_with_ray_plot = means_stddevs_plot(
  np_ray_data_ns[:,1], np_ray_data_ns[:,3], np_ray_data_ns[:,4], title = 'π Results with Ray')
# Use a grid to make it layout better.
pi_with_ray_grid = gridplot([[pi_with_ray_plot]], width=1000, height=400)
show(pi_with_ray_grid)

ray.get() vs ray.wait()

Calling ray.get(ids) blocks until all the tasks have completed that correspond to the input ids. That has been fine for this tutorial so far, but what if you’re waiting for a number of tasks, where some will finish more quickly than others? What if you would like to process the completed results as they become available, even while other tasks are still running? That’s where ray.wait() is recommended. Here we’ll provide a brief example. For more detailed, see the Advanced Ray: Tasks Revisited.

@ray.remote
def ray_estimate_pi2(n, trial):
    time.sleep(trial)
    return n, trial, estimate_pi(n)
def ray_try_it2(ns, trials):
    start = time.time()
    refs = [ray_estimate_pi2.remote(n, trial) for trial in trials for n in ns]
    still_running = list(refs)
    while len(still_running) > 0:
        finished, still_running = ray.wait(still_running)
        ns_trials_pis = ray.get(finished)   # won't block
        print(f'{ns_trials_pis}, elapsed time = {time.time() - start} secs')
ray_try_it2([100000,1000000,1000000], [2,4,6])
[(100000, 2, 3.1518)], elapsed time = 2.055223226547241 secs
[(1000000, 2, 3.142156)], elapsed time = 2.3092033863067627 secs
[(1000000, 2, 3.143288)], elapsed time = 2.315354108810425 secs
[(100000, 4, 3.15264)], elapsed time = 4.065950632095337 secs
[(1000000, 4, 3.140448)], elapsed time = 6.2474846839904785 secs
[(1000000, 4, 3.141444)], elapsed time = 6.523554086685181 secs
[(100000, 6, 3.14572)], elapsed time = 8.332385778427124 secs
[(1000000, 6, 3.142988)], elapsed time = 10.221294164657593 secs
[(1000000, 6, 3.140912)], elapsed time = 12.463234663009644 secs
ray.shutdown()  # "Undo ray.init()". Terminate all the processes started in this notebook.